spark 操作redis 您所在的位置:网站首页 python redis 读取set spark 操作redis

spark 操作redis

2023-07-31 20:54| 来源: 网络整理| 查看: 265

依赖库

spark 操作redis的时候,依赖的库是spark-redis

首先我们导入依赖

com.redislabs spark-redis_2.11 2.4.2 spark-redis 参数设置

首先初始化一个spark实例,spark-redis的参数在config中进行配置。

import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession def main(args: Array[String]): Unit = { val conf: SparkConf =new SparkConf().setAppName("setRedis").setMaster("local[2]") conf.set("redis.host", "localhost") //redis 主机节点 conf.set("redis.port", "6379") //端口号,不填默认为6379 val session: SparkSession =SparkSession.builder().config(conf).getOrCreate() val sc: SparkContext =session.sparkContext }

还可以设置一些额外的参数

conf.set("redis.auth","null") //用户权限配置 conf.set("redis.db","0") //数据库设置 conf.set("redis.timeout","2000") //设置连接超时时间 简单使用

sc通过导入的隐式转换可以调出的读取Redis的方法,都是以fromRedis开头的,都是redis可以存储的数据结构,这里以常见的KV进行示例

import com.redislabs.provider.redis._ 读取Redis

通过fromRedisKV方法,获取一个键的值。

val rdd: RDD[(String, String)] =sc.fromRedisKV("a") rdd.collect().foreach(println(_)) /** * (a,1) */

fromRedisKV()的源码:

/** * @param keysOrKeyPattern an array of keys or a key pattern * @param partitionNum number of partitions * @return RedisKVRDD of simple Key-Values stored in redis server */ def fromRedisKV[T](keysOrKeyPattern: T, partitionNum: Int = 3) (implicit redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)): RDD[(String, String)] = { keysOrKeyPattern match { case keyPattern: String => fromRedisKeyPattern(keyPattern, partitionNum).getKV() case keys: Array[String] => fromRedisKeys(keys, partitionNum).getKV() case _ => throw new scala.Exception(IncorrectKeysOrKeyPatternMsg) } }

fromRedisKV()的参数:

泛类型 keysOrKeyPattern 从match case 模式匹配代码中可以看出,这里的T可是是两种类型,一个是String,另一个是Array[String],如果传入其他类型则会抛出运行时异常,其中String类型的意思是匹配键,这里可以用通配符比如foo*,所以返回值是一个结果集RDD[(String, String)],当参数类型为Array[String]时是指传入key的数组,返回的结果则为相应的的结果集,RDD的内容类型也是KV形式。

Ine类型 partitionNum 生成RDD的分区数,默认为3,可以根据实际情况进行更改防止数据过度倾斜。

柯里化形式隐式参数 redisConfig 由于我们之前在sparkConf里面set了相应的参数,这里不传入这个参数即可。如要调整,则可以按照源码中的方式传入,其中RedisEndpoint是一个case class类,而且很多参数都有默认值(比如6379的端口号),所以自己建立一个RedisEndpoint也是非常方便的。

写入Redis

通过toRedisKV将数据写入redis

val data: Seq[(String, String)] = Seq[(String,String)](("high","111"), ("abc","222"), ("together","333")) val redisData:RDD[(String,String)] = sc.parallelize(data) sc.toRedisKV(redisData)

查看redis

127.0.0.1:6379> keys * 1) "high" 2) "together" 3) "abc" 127.0.0.1:6379>

toRedisKV()的源码:

/** * @param kvs Pair RDD of K/V * @param ttl time to live */ def toRedisKV(kvs: RDD[(String, String)], ttl: Int = 0) (implicit redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) { kvs.foreachPartition(partition => setKVs(partition, ttl, redisConfig, readWriteConfig)) }

toRedisKV()的参数

kv类型的RDD kvs是一个键值对类型的RDD,键和值的类型都是String类型

Int类型的ttl ttl是存入数据的过期时间,单位是秒

以上就是spark读写redis的两个常用的方法。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有